package dslab.transfer;

import dslab.core.DMTPHandler;
import dslab.core.DMTPProtocol;
import dslab.nameserver.INameserverRemote;
import dslab.util.*;

import java.io.*;
import java.net.InetAddress;
import java.net.Socket;
import java.rmi.RemoteException;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;

/**
 * transfer related Handler class that is runnable and should run in its own thread;
 * implements the DMTPProtocol, which forces it to implement all methods that are required to deal with DMTP;
 * re-uses a great deal of code from DMTPHandler
 */
public class TransferDMTPHandler extends DMTPHandler implements Runnable, DMTPProtocol {

    private final Config domains;
    private final BlockingQueue<Runnable> outboundSendTasks;
    private final Pair<InetAddress, Integer> monitoring;
    private final INameserverRemote root;

    /**
     * creates a new TransferDMTPHandler client connection handler
     * @param componentId componentId of transfer handler
     * @param socket client connection socket to handle
     * @param domains domains configuration for static name resolution
     * @param outboundSendTasks queue to append new send tasks to
     */
    public TransferDMTPHandler(String componentId, Socket socket,
                               Config domains, BlockingQueue<Runnable> outboundSendTasks,
                               Pair<InetAddress, Integer> monitoring, INameserverRemote root) throws IOException {
        super(componentId, socket);
        this.domains = domains;
        this.outboundSendTasks = outboundSendTasks;
        this.monitoring = monitoring;
        this.root = root;
    }

    @Override
    public void handleTo(Writer out, List<String> addresses, String addressesString) throws IOException {
        if (isNotInState(out, SessionState.EditingMessage, ILLEGAL_STATE_WARNING))
            return;

        // validate each address
        // no user validation, that is the responsibility of the mailbox
        for (String address : addresses) {
            // validate domain
            String mailbox = address.split("@")[1];
            if (isUnknownMailbox(mailbox)) {
                // exit preemptively in failure case
                raiseWarning(out, String.format("error unknown mailbox %s%n", mailbox));
                return;
            }
        }

        // communicate result
        out.write(String.format("ok %d%n", addresses.size()));
        out.flush();

        message.recipients = addressesString;
    }

    @Override
    public void handleFrom(Writer out, String address) throws IOException {
        if (isNotInState(out, SessionState.EditingMessage, ILLEGAL_STATE_WARNING))
            return;

        // validate domain
        String mailbox = address.split("@")[1];
        if (isUnknownMailbox(mailbox)) {
            // exit preemptively in failure case
            raiseWarning(out, String.format("error unknown mailbox %s%n", mailbox));
            return;
        }

        // communicate result
        out.write(String.format("ok%n"));
        out.flush();
        message.sender = address;
    }

    @Override
    public void handleSend(Writer out) throws IOException, InterruptedException {
        if (isNotInState(out, SessionState.EditingMessage, ILLEGAL_STATE_WARNING))
            return;

        if (!message.isMessageIncomplete()) {
            raiseWarning(out, String.format("error %s%n", message.getMissingFields()));
            return;
        }

        // recipient string is already validated
        Map<String, List<String>> groupedRecipients = Arrays
                .stream(message.recipients.split(","))
                .collect(Collectors.groupingBy(r -> r.split("@")[1]));
        System.out.printf("@%s: recipients grouped as follows %s%n", componentId, groupedRecipients.toString());
        // for each mailbox server send a message
        for (String domain : groupedRecipients.keySet()) {
            // notice: scrapped
            // edit message to contain only grouped recipients
            //Message mailboxSpecificMessage = new Message(message);
            //mailboxSpecificMessage.recipients = String.join(",", groupedRecipients.get(domain));

            // spawn client to send email
            DMTPClient dmtpClient = new DMTPClient(
                String.format("%s-sender-%s", componentId, domain),
                domain,
                new Message(message),
                domains,
                monitoring,
                true,
                root
            );
            outboundSendTasks.put(dmtpClient);
        }

        // communicate result
        out.write(String.format("ok%n"));
        out.flush();
        state = SessionState.NoMessages;
        message.reset();
    }

    /**
     * validates whether a mailbox can be looked up or not
     * @param mailbox mailbox to query
     * @return whether mailbox is known or not
     */
    private boolean isUnknownMailbox(String mailbox) {
        return !domains.containsKey(mailbox);
    }
}
